Skip to content

Commit

Permalink
Improve S3 file upload/download via Transfer manager
Browse files Browse the repository at this point in the history
This commit allows the use of the AWS Transfer Manager when
when copying S3 files from and to an S3 bucket. 

The use of the "native" transfer provides better performance and stability  

Signed-off-by: Jorge Aguilera <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
jorgeaguileraseqera and pditommaso authored Jul 17, 2022
1 parent 1c23b40 commit 7e8d2a5
Show file tree
Hide file tree
Showing 24 changed files with 993 additions and 247 deletions.
85 changes: 11 additions & 74 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import java.nio.file.Paths
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.function.Consumer

import com.google.common.hash.HashCode
import groovy.transform.CompileDynamic
Expand All @@ -50,6 +49,7 @@ import nextflow.executor.ExecutorFactory
import nextflow.extension.CH
import nextflow.file.FileHelper
import nextflow.file.FilePorter
import nextflow.file.FileTransferPool
import nextflow.plugin.Plugins
import nextflow.processor.ErrorStrategy
import nextflow.processor.TaskFault
Expand All @@ -73,7 +73,6 @@ import nextflow.util.ConfigHelper
import nextflow.util.Duration
import nextflow.util.HistoryFile
import nextflow.util.NameGenerator
import nextflow.util.ThreadPoolBuilder
import nextflow.util.VersionNumber
import sun.misc.Signal
import sun.misc.SignalHandler
Expand Down Expand Up @@ -630,7 +629,10 @@ class Session implements ISession {
void destroy() {
try {
log.trace "Session > destroying"

// note: the file transfer pool must be terminated before
// invoking the shutdown callback to prevent depending pool (e.g. s3 transfer pool)
// are terminated while some file still needs to be download/uploaded
FileTransferPool.shutdown(aborted)
// invoke shutdown callbacks
shutdown0()
log.trace "Session > after cleanup"
Expand Down Expand Up @@ -904,13 +906,17 @@ class Session implements ISession {
* Register a shutdown hook to close services when the session terminates
* @param Closure
*/
void onShutdown( Closure shutdown ) {
void onShutdown( Closure<Void> shutdown ) {
if( !shutdown )
return

shutdownCallbacks << shutdown
}

void onShutdown( Consumer<Object> callback ) {
onShutdown( { callback.accept(it) } )
}

void notifyProcessCreate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1338,73 +1344,4 @@ class Session implements ISession {
ansiLogObserver ? ansiLogObserver.appendInfo(file.text) : Files.copy(file, System.out)
}

@Memoized // <-- this guarantees that the same executor is used across different publish dir in the same session
@CompileStatic
synchronized ExecutorService getFileTransferThreadPool() {
final DEFAULT_MIN_THREAD = Math.min(Runtime.runtime.availableProcessors(), 4)
final DEFAULT_MAX_THREAD = DEFAULT_MIN_THREAD
final DEFAULT_QUEUE = 10_000
final DEFAULT_KEEP_ALIVE = Duration.of('60sec')
final DEFAULT_MAX_AWAIT = Duration.of('12 hour')

def minThreads = config.navigate("threadPool.FileTransfer.minThreads", DEFAULT_MIN_THREAD) as Integer
def maxThreads = config.navigate("threadPool.FileTransfer.maxThreads", DEFAULT_MAX_THREAD) as Integer
def maxQueueSize = config.navigate("threadPool.FileTransfer.maxQueueSize", DEFAULT_QUEUE) as Integer
def keepAlive = config.navigate("threadPool.FileTransfer.keepAlive", DEFAULT_KEEP_ALIVE) as Duration
def maxAwait = config.navigate("threadPool.FileTransfer.maxAwait", DEFAULT_MAX_AWAIT) as Duration
def allowThreadTimeout = config.navigate("threadPool.FileTransfer.allowThreadTimeout", false) as Boolean

if( minThreads>maxThreads ) {
log.debug("FileTransfer minThreads ($minThreads) cannot be greater than maxThreads ($maxThreads) - Setting minThreads to $maxThreads")
minThreads = maxThreads
}

final pool = new ThreadPoolBuilder()
.withName('FileTransfer')
.withMinSize(minThreads)
.withMaxSize(maxThreads)
.withQueueSize(maxQueueSize)
.withKeepAliveTime(keepAlive)
.withAllowCoreThreadTimeout(allowThreadTimeout)
.build()

this.onShutdown {
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// start shutdown process
if( aborted ) {
pool.shutdownNow()
return
}
pool.shutdown()
// wait for ongoing file transfer to complete
int count=0
while( true ) {
final terminated = pool.awaitTermination(5, TimeUnit.SECONDS)
if( terminated )
break

final delta = System.currentTimeMillis()-t0
if( delta > max ) {
log.warn "Exiting before FileTransfer thread pool complete -- Some files maybe lost"
break
}

final p1 = ((ThreadPoolExecutor)pool)
final pending = p1.getTaskCount() - p1.getCompletedTaskCount()
// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
log.info1 "Waiting files transfer to complete (${pending} files)"
}
// log to the debug file every minute (12 * 5 sec)
else if( count % 12 == 0 ) {
log.debug "Waiting files transfer to complete (${pending} files)"
}
// increment the count
count++
}
}

return pool
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FilePorter {

final Map<Path,FileTransfer> stagingTransfers = new HashMap<>()

@Lazy private ExecutorService threadPool = session.getFileTransferThreadPool()
@Lazy private ExecutorService threadPool = FileTransferPool.getExecutorService()

private Duration pollTimeout

Expand Down
110 changes: 110 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/file/FileTransferPool.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2020-2022, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file

import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import nextflow.Global
import nextflow.util.Duration
import nextflow.util.ThreadPoolBuilder
import nextflow.util.ThreadPoolHelper

/**
* Holder object for file transfrer thread pool
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
class FileTransferPool {

final static private DEFAULT_MIN_THREAD = 1
final static private DEFAULT_MAX_THREAD = Math.min(Runtime.runtime.availableProcessors()*2, 10)
final static private DEFAULT_QUEUE = 10_000
final static private DEFAULT_KEEP_ALIVE = Duration.of('60sec')
final DEFAULT_MAX_AWAIT = Duration.of('12 hour')

final private Integer minThreads
final private Integer maxThreads
final private Integer maxQueueSize
final private Duration keepAlive
final private Boolean allowThreadTimeout
final private Duration maxAwait
final private ThreadPoolExecutor executorService

static private FileTransferPool instance0

FileTransferPool(Map config) {
this.minThreads = config.navigate("threadPool.FileTransfer.minThreads", DEFAULT_MIN_THREAD) as Integer
this.maxThreads = config.navigate("threadPool.FileTransfer.maxThreads", DEFAULT_MAX_THREAD) as Integer
this.maxQueueSize = config.navigate("threadPool.FileTransfer.maxQueueSize", DEFAULT_QUEUE) as Integer
this.keepAlive = config.navigate("threadPool.FileTransfer.keepAlive", DEFAULT_KEEP_ALIVE) as Duration
this.allowThreadTimeout = config.navigate("threadPool.FileTransfer.allowThreadTimeout", false) as Boolean
this.maxAwait = config.navigate("threadPool.FileTransfer.maxAwait", DEFAULT_MAX_AWAIT) as Duration

if( minThreads>maxThreads ) {
log.debug("FileTransfer minThreads ($minThreads) cannot be greater than maxThreads ($maxThreads) - Setting minThreads to $maxThreads")
minThreads = maxThreads
}

executorService = new ThreadPoolBuilder()
.withName('FileTransfer')
.withMinSize(minThreads)
.withMaxSize(maxThreads)
.withQueueSize(maxQueueSize)
.withKeepAliveTime(keepAlive)
.withAllowCoreThreadTimeout(allowThreadTimeout)
.build()
}

private ExecutorService getExecutorService0() {
return executorService
}

private void shutdown0(boolean hard) {
if( hard ) {
executorService.shutdownNow()
return
}

executorService.shutdown()
// wait for ongoing file transfer to complete
final waitMsg = "Waiting files transfer to complete (%d files)"
final exitMsg = "Exiting before FileTransfer thread pool complete -- Some files maybe lost"
ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg)
}

@Memoized
static synchronized ExecutorService getExecutorService() {
final session = Global.session
if( session == null )
throw new IllegalStateException("Nextflow session object has not been created yet")
instance0 = new FileTransferPool(session.getConfig())
return instance0.getExecutorService0()
}

static shutdown(boolean hard) {
if( instance0 )
instance0.shutdown0(hard)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import nextflow.NF
import nextflow.Session
import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.file.FileTransferPool
import nextflow.file.TagAwareFile
import nextflow.util.PathTrie
/**
Expand Down Expand Up @@ -109,7 +110,7 @@ class PublishDir {
private String taskName

@Lazy
private ExecutorService threadPool = (Global.session as Session).getFileTransferThreadPool()
private ExecutorService threadPool = FileTransferPool.getExecutorService()

void setPath( Closure obj ) {
setPath( obj.call() as Path )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import static nextflow.Const.*

import java.lang.reflect.Field
import java.nio.file.DirectoryNotEmptyException
import java.nio.file.FileAlreadyExistsException
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -457,6 +458,9 @@ class LoggerHelper {
else if( fail instanceof NoSuchFileException ) {
buffer.append("No such file: ${normalize(fail.message)}")
}
else if( fail instanceof FileAlreadyExistsException ) {
buffer.append("File already exist: $fail.message")
}
else if( fail instanceof ClassNotFoundException ) {
buffer.append("Class not found: ${normalize(fail.message)}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class SimpleAgent<T> {
throw new IllegalArgumentException("Invalid agent event object: $ev [${ev.getClass().getName()}]")
}
catch (InterruptedException e) {
log.debug "Got an interrupeted exception while polling agent event | ${e.message ?: e}"
log.debug "Got an interrupted exception while polling agent event | ${e.message ?: e}"
break
}
catch(Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2020-2022, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.util

import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
/**
* Thread pool helpers
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
@Slf4j
class ThreadPoolHelper {

static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) {
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// wait for ongoing file transfer to complete
int count=0
while( true ) {
final terminated = pool.awaitTermination(5, TimeUnit.SECONDS)
if( terminated )
break

final delta = System.currentTimeMillis()-t0
if( delta > max ) {
log.warn(exitMsg)
break
}

final p1 = ((ThreadPoolExecutor)pool)
final pending = p1.getTaskCount() - p1.getCompletedTaskCount()
// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
log.info1(String.format(waitMessage, pending))
}
// log to the debug file every minute (12 * 5 sec)
else if( count % 12 == 0 ) {
log.debug(String.format(waitMessage, pending))
}
// increment the count
count++
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package nextflow.processor


import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Paths
import java.util.concurrent.TimeUnit

import nextflow.Session
import nextflow.file.FileTransferPool
import spock.lang.Specification
import test.TestHelper
/**
Expand Down Expand Up @@ -181,8 +180,7 @@ class PublishDirTest extends Specification {
def publisher = new PublishDir(path: publishDir, mode: 'copy')
publisher.apply( outputs, task )

session.fileTransferThreadPool.shutdown()
session.fileTransferThreadPool.awaitTermination(5, TimeUnit.SECONDS)
FileTransferPool.shutdown(false)

then:
publishDir.resolve('file1.txt').text == 'aaa'
Expand Down
Loading

0 comments on commit 7e8d2a5

Please sign in to comment.