Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend observer with onFileStage #5907

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docs/developer/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,19 @@ class MyObserverFactory implements TraceObserverFactory {
// MyObserver.groovy
import java.nio.file.Path

import nextflow.Session
import nextflow.processor.TaskHandler
import nextflow.trace.TraceObserver
import nextflow.trace.TraceRecord

class MyObserver implements TraceObserver {
private Session session

@Override
void onFlowBegin() {
void onFlowCreate(Session session) {
// store the session for later use
this.session = session

println "Okay, let's begin!"
}

Expand All @@ -372,9 +377,14 @@ class MyObserver implements TraceObserver {
println "I found a task in the cache! It's name is '${handler.task.name}'"
}

@Override
void onFileStage(Path destination, Path source) {
println("I staged a file from '${source.toUriString()}' to '${destination.toUriString()}'")
}

@Override
void onFilePublish(Path destination, Path source) {
println "I published a file! It's located at ${path.toUriString()}"
println "I published a file! It's located at '${destination.toUriString()}'"
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,18 @@ class Session implements ISession {
}
}

void notifyFileStage(Path destination, Path source=null) {
def copy = new ArrayList<TraceObserver>(observers)
for( TraceObserver observer : copy ) {
try {
observer.onFileStage(destination, source)
}
catch( Exception e ) {
log.error "Failed to invoke observer on file stage: $observer", e
}
}
}

void notifyFlowComplete() {
def copy = new ArrayList<TraceObserver>(observers)
for( TraceObserver observer : copy ) {
Expand Down
10 changes: 7 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class FilePorter {
}

protected FileTransfer createFileTransfer(Path source, Path target) {
return new FileTransfer(source, target, maxRetries, semaphore)
return new FileTransfer(source, target, maxRetries, semaphore, session)
}

protected FileTransfer getOrSubmit(FileCopy copy) {
Expand Down Expand Up @@ -281,15 +281,17 @@ class FilePorter {
volatile Future result
private String message
private int debugDelay
final private Session session

FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore) {
FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore, Session session) {
this.semaphore = semaphore
this.source = foreignPath
this.target = stagePath
this.maxRetries = maxRetries
this.message = "Staging foreign file: ${source.toUriString()}"
this.refCount = new AtomicInteger(0)
this.debugDelay = System.getProperty('filePorter.debugDelay') as Integer ?: 0
this.session = session
}

@Override
Expand Down Expand Up @@ -325,7 +327,9 @@ class FilePorter {
int count = 0
while( true ) {
try {
return stageForeignFile0(filePath, stagePath)
def output = stageForeignFile0(filePath, stagePath)
this.session.notifyFileStage(stagePath, filePath)
return output
}
catch( IOException e ) {
// remove the target file that could be have partially downloaded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,15 @@ trait TraceObserver {
void onFilePublish(Path destination, Path source){
onFilePublish(destination)
}

/**
* Method that is invoke when an output file is staged
* into the work directory.
*
* @param destination
* The destination path at staging folder.
* @param source
* The source remote source path.
*/
void onFileStage(Path destination, Path source){}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,6 @@ class FilePorterTest extends Specification {



static class ErrorStage extends FilePorter.FileTransfer {

ErrorStage(Path path, Path stagePath, int maxRetries) {
super(path, stagePath, maxRetries, new Semaphore(100))
}

@Override
void run() throws Exception {
throw new ProcessStageException('Cannot stage file')
}
}

def 'should submit actions' () {

given:
Expand Down Expand Up @@ -283,11 +271,14 @@ class FilePorterTest extends Specification {
}
def 'should stage a file' () {
given:
def sess = Mock(Session) {
getConfig() >> [:]
}
def folder = Files.createTempDirectory('test')
def local1 = folder.resolve('hola.text')
def foreign1 = TestHelper.createInMemTempFile('hola.txt', 'hola mundo!')
and:
def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore))
def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore), sess)

when:
porter.stageForeignFile(foreign1, local1)
Expand All @@ -309,13 +300,16 @@ class FilePorterTest extends Specification {

def 'should check valid files' () {
given:
def sess = Mock(Session) {
getConfig() >> [:]
}
def CONTENT = 'hola mundo!'
def foreign1 = TestHelper.createInMemTempFile('hola.txt', CONTENT)
and:
def folder = Files.createTempDirectory('test')
def local1 = folder.resolve('hola.text'); local1.text = CONTENT
and:
def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore))
def porter = new FilePorter.FileTransfer(foreign1, local1, 0, Mock(Semaphore), sess)

when:
def equals = porter.checkPathIntegrity(foreign1, local1)
Expand Down
Loading