diff --git a/docs/developer/plugins.md b/docs/developer/plugins.md index 7ea5a779e5..089713f0b6 100644 --- a/docs/developer/plugins.md +++ b/docs/developer/plugins.md @@ -351,14 +351,19 @@ class MyObserverFactory implements TraceObserverFactory { // MyObserver.groovy import java.nio.file.Path +import nextflow.Session import nextflow.processor.TaskHandler import nextflow.trace.TraceObserver import nextflow.trace.TraceRecord class MyObserver implements TraceObserver { + private Session session @Override - void onFlowBegin() { + void onFlowCreate(Session session) { + // store the session for later use + this.session = session + println "Okay, let's begin!" } @@ -372,9 +377,14 @@ class MyObserver implements TraceObserver { println "I found a task in the cache! It's name is '${handler.task.name}'" } + @Override + void onFileStage(Path destination, Path source) { + println("I staged a file from '${source.toUriString()}' to '${destination.toUriString()}'") + } + @Override void onFilePublish(Path destination, Path source) { - println "I published a file! It's located at ${path.toUriString()}" + println "I published a file! It's located at '${destination.toUriString()}'" } @Override diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 09152bc301..99628d35ba 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -1149,6 +1149,18 @@ class Session implements ISession { } } + void notifyFileStage(Path destination, Path source=null) { + def copy = new ArrayList(observers) + for( TraceObserver observer : copy ) { + try { + observer.onFileStage(destination, source) + } + catch( Exception e ) { + log.error "Failed to invoke observer on file stage: $observer", e + } + } + } + void notifyFlowComplete() { def copy = new ArrayList(observers) for( TraceObserver observer : copy ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy index 89e720bc7b..81f81e7ea8 100644 --- a/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy @@ -105,7 +105,7 @@ class FilePorter { } protected FileTransfer createFileTransfer(Path source, Path target) { - return new FileTransfer(source, target, maxRetries, semaphore) + return new FileTransfer(source, target, maxRetries, semaphore, session) } protected FileTransfer getOrSubmit(FileCopy copy) { @@ -281,8 +281,9 @@ class FilePorter { volatile Future result private String message private int debugDelay + final private Session session - FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore) { + FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore, Session session) { this.semaphore = semaphore this.source = foreignPath this.target = stagePath @@ -290,6 +291,7 @@ class FilePorter { this.message = "Staging foreign file: ${source.toUriString()}" this.refCount = new AtomicInteger(0) this.debugDelay = System.getProperty('filePorter.debugDelay') as Integer ?: 0 + this.session = session } @Override @@ -325,7 +327,9 @@ class FilePorter { int count = 0 while( true ) { try { - return stageForeignFile0(filePath, stagePath) + def output = stageForeignFile0(filePath, stagePath) + this.session.notifyFileStage(stagePath, filePath) + return output } catch( IOException e ) { // remove the target file that could be have partially downloaded diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy index 16f844643d..7c1dcfbfcc 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceObserver.groovy @@ -150,4 +150,15 @@ trait TraceObserver { void onFilePublish(Path destination, Path source){ onFilePublish(destination) } + + /** + * Method that is invoke when an output file is staged + * into the work directory. + * + * @param destination + * The destination path at staging folder. + * @param source + * The source remote source path. + */ + void onFileStage(Path destination, Path source){} } diff --git a/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy b/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy index 6515629439..df40815606 100644 --- a/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy @@ -133,18 +133,6 @@ class FilePorterTest extends Specification { - static class ErrorStage extends FilePorter.FileTransfer { - - ErrorStage(Path path, Path stagePath, int maxRetries) { - super(path, stagePath, maxRetries, new Semaphore(100)) - } - - @Override - void run() throws Exception { - throw new ProcessStageException('Cannot stage file') - } - } - def 'should submit actions' () { given: @@ -283,11 +271,14 @@ class FilePorterTest extends Specification { } def 'should stage a file' () { given: + def sess = Mock(Session) { + getConfig() >> [:] + } def folder = Files.createTempDirectory('test') def local1 = folder.resolve('hola.text') def foreign1 = TestHelper.createInMemTempFile('hola.txt', 'hola mundo!') and: - def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore)) + def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore), sess) when: porter.stageForeignFile(foreign1, local1) @@ -309,13 +300,16 @@ class FilePorterTest extends Specification { def 'should check valid files' () { given: + def sess = Mock(Session) { + getConfig() >> [:] + } def CONTENT = 'hola mundo!' def foreign1 = TestHelper.createInMemTempFile('hola.txt', CONTENT) and: def folder = Files.createTempDirectory('test') def local1 = folder.resolve('hola.text'); local1.text = CONTENT and: - def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore)) + def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore), sess) when: def equals = porter.checkPathIntegrity(foreign1, local1)