Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow.cloud.aws

import nextflow.cloud.aws.config.AwsBucketConfig
import nextflow.cloud.aws.nio.util.S3AsyncClientConfiguration
import nextflow.cloud.aws.nio.util.S3SyncClientConfiguration
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider
Expand Down Expand Up @@ -209,19 +210,20 @@ class AwsClientFactory {
return CloudWatchLogsClient.builder().region(getRegionObj(region)).credentialsProvider(getCredentialsProvider0()).build()
}

S3Client getS3Client(S3SyncClientConfiguration s3ClientConfig, boolean global = false) {
S3Client getS3Client(S3SyncClientConfiguration s3ClientConfig, String bucketName, boolean global = false) {
final SdkHttpClient.Builder httpClientBuilder = s3ClientConfig.getHttpClientBuilder()
final ClientOverrideConfiguration overrideConfiguration = s3ClientConfig.getClientOverrideConfiguration()
final bucketConfig = config.getBucketConfig(bucketName)
final builder = S3Client.builder()
.crossRegionAccessEnabled(global)
.credentialsProvider(getS3CredentialsProvider())
.credentialsProvider(getS3CredentialsProvider(bucketConfig))
.serviceConfiguration(S3Configuration.builder()
.pathStyleAccessEnabled(config.s3Config.pathStyleAccess)
.pathStyleAccessEnabled(bucketConfig.s3PathStyleAccess)
.multiRegionEnabled(global)
.build())

if( config.s3Config.endpoint )
builder.endpointOverride(URI.create(config.s3Config.endpoint))
if( bucketConfig.endpoint )
builder.endpointOverride(URI.create(bucketConfig.endpoint))

// AWS SDK v2 region must be always set, even when endpoint is overridden
builder.region(getRegionObj(region))
Expand All @@ -235,14 +237,15 @@ class AwsClientFactory {
return builder.build()
}

S3AsyncClient getS3AsyncClient(S3AsyncClientConfiguration s3ClientConfig, boolean global = false) {
S3AsyncClient getS3AsyncClient(S3AsyncClientConfiguration s3ClientConfig, String bucketName, boolean global = false) {
final bucketConfig = config.getBucketConfig(bucketName)
def builder = S3AsyncClient.crtBuilder()
.crossRegionAccessEnabled(global)
.credentialsProvider(getS3CredentialsProvider())
.forcePathStyle(config.s3Config.pathStyleAccess)
.credentialsProvider(getS3CredentialsProvider(bucketConfig))
.forcePathStyle(bucketConfig.pathStyleAccess)
.region(getRegionObj(region))
if( config.s3Config.endpoint )
builder.endpointOverride(URI.create(config.s3Config.endpoint))
if( bucketConfig.endpoint )
builder.endpointOverride(URI.create(bucketConfig.endpoint))

final retryConfiguration = s3ClientConfig.getCrtRetryConfiguration()
if( retryConfiguration != null )
Expand Down Expand Up @@ -287,8 +290,8 @@ class AwsClientFactory {
*
* @return an AwsCredentialsProvider instance, falling back to anonymous if needed.
*/
private AwsCredentialsProvider getS3CredentialsProvider() {
if ( config.s3Config.anonymous )
private AwsCredentialsProvider getS3CredentialsProvider(AwsBucketConfig configBucket) {
if ( configBucket?.anonymous || config.s3Config.anonymous)
return AnonymousCredentialsProvider.create()
def provider = getCredentialsProvider0()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,24 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
List<String> getLaunchCommand(String s3WorkDir) {
// the cmd list to launch it
final opts = getAwsOptions()

// We cannot use 's3WorkDir' to extract the bucket-specific CLI arguments. Depending on the Task type (Array or Single),
// it could be an environment variable reference or a real path. In both cases, these argument will be used to upload
// the '.command.log' file to the workingDir bucket.
final bucket = (getWorkDir() as S3Path).bucket
String args = opts.generateUploadCliArgs(bucket)
args = args ? " $args" : ''

final cmd = opts.s5cmdPath
? s5Cmd(s3WorkDir, opts)
: s3Cmd(s3WorkDir, opts)
? s5Cmd(s3WorkDir, opts, args)
: s3Cmd(s3WorkDir, opts, args)
return ['bash','-o','pipefail','-c', cmd.toString()]
}

static String s3Cmd(String workDir, AwsOptions opts) {
static String s3Cmd(String workDir, AwsOptions opts, String args) {
final cli = opts.getAwsCli()
final debug = opts.debug ? ' --debug' : ''
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''
final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}${requesterPays}"
final aws = "$cli s3 cp --only-show-errors${debug}${args}"

/*
* Enhanced signal handling for AWS Batch tasks to fix nested Nextflow execution issues.
Expand Down Expand Up @@ -402,12 +407,8 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
return cmd
}

static String s5Cmd(String workDir, AwsOptions opts) {
static String s5Cmd(String workDir, AwsOptions opts, String args) {
final cli = opts.getS5cmdPath()
final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : ''
final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : ''
final requesterPays = opts.requesterPays ? ' --request-payer requester' : ''

/*
* Enhanced signal handling for AWS Batch tasks using s5cmd (high-performance S3 client).
* This implementation mirrors the s3Cmd method but uses s5cmd instead of aws-cli for
Expand All @@ -430,7 +431,7 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
* - Maintains the same signal-responsive background execution pattern
* - Provides real-time logging while allowing proper signal handling
*/
final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $cli cp${sse}${kms}${requesterPays} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid"
final cmd = "trap \"[[ -n \\\$pid ]] && kill -TERM \\\$pid\" TERM; trap \"{ ret=\$?; $cli cp${args} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash > >(tee ${TaskRun.CMD_LOG}) 2>&1 & pid=\$!; wait \$pid"
return cmd
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cloud.aws.config.AwsConfig
import nextflow.cloud.aws.nio.S3Path
import nextflow.cloud.aws.util.S3BashLib
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.processor.TaskBean
Expand Down Expand Up @@ -93,10 +95,12 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String stageInputFile( Path path, String targetName ) {
def args = path instanceof S3Path ? opts.generateDownloadCliArgs(((S3Path)path).bucket) : ''
args = args ? " $args" : ''
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = opts.maxTransferAttempts > 1 && !opts.retryMode
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}\")"
? "downloads+=(\"nxf_cp_retry nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}${args}\")"
: "downloads+=(\"nxf_s3_download s3:/${Escape.path(path)} ${Escape.path(targetName)}${args}\")"
return stage_cmd
}

Expand All @@ -116,12 +120,14 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
final escape = new ArrayList(outputFiles.size())
for( String it : patterns )
escape.add( Escape.path(it) )
def args = targetDir instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)targetDir).bucket) : ''
args = args ? " $args" : ''

return """\
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("nxf_s3_upload '\$name' s3:/${Escape.path(targetDir)}")
uploads+=("nxf_s3_upload '\$name' s3:/${Escape.path(targetDir)}${args}")
done
unset IFS
nxf_parallel "\${uploads[@]}"
Expand All @@ -133,34 +139,42 @@ class AwsBatchFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String touchFile( Path file ) {
"echo start | nxf_s3_upload - s3:/${Escape.path(file)}"
def args = file instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)file).bucket) : ''
args = args ? " $args" : ''
return "echo start | nxf_s3_upload - s3:/${Escape.path(file)}${args}"
}

/**
* {@inheritDoc}
*/
@Override
String fileStr( Path path ) {
Escape.path(path.getFileName())
return Escape.path(path.getFileName())
}

/**
* {@inheritDoc}
*/
@Override
String copyFile( String name, Path target ) {
"nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}"
def args = target instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)target).bucket) : ''
args = args ? " $args" : ''
return "nxf_s3_upload ${Escape.path(name)} s3:/${Escape.path(target.getParent())}${args}"
}

static String uploadCmd( String source, Path target ) {
"nxf_s3_upload ${Escape.path(source)} s3:/${Escape.path(target)}"
static String uploadCmd( String source, Path target, Map config) {
def args = target instanceof S3Path && config ? new AwsConfig(config).generateUploadCliArgs(((S3Path)target).bucket) : ''
args = args ? " $args" : ''
return "nxf_s3_upload ${Escape.path(source)} s3:/${Escape.path(target)}${args}"
}

/**
* {@inheritDoc}
*/
String exitFile( Path path ) {
"| nxf_s3_upload - s3:/${Escape.path(path)} || true"
def args = path instanceof S3Path ? opts.generateUploadCliArgs(((S3Path)path).bucket) : ''
args = args ? " $args" : ''
return "| nxf_s3_upload - s3:/${Escape.path(path)}${args} || true"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package nextflow.cloud.aws.batch

import java.nio.file.Path

import software.amazon.awssdk.services.s3.model.ObjectCannedACL
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
Expand Down Expand Up @@ -112,34 +111,13 @@ class AwsOptions implements CloudTransferOptions {
return awsConfig.batchConfig.getDelayBetweenAttempts()
}

String getStorageClass() {
return awsConfig.s3Config.getStorageClass()
}

String getStorageEncryption() {
return awsConfig.s3Config.getStorageEncryption()
}

String getStorageKmsKeyId() {
return awsConfig.s3Config.getStorageKmsKeyId()
}

ObjectCannedACL getS3Acl() {
return awsConfig.s3Config.getS3Acl()
}

Boolean getDebug() {
return awsConfig.s3Config.getDebug()
}

Boolean getRequesterPays() {
return awsConfig.s3Config.getRequesterPays()
}

String getAwsCli() {
def result = getCliPath()
if( !result ) result = 'aws'
if( region ) result += " --region $region"
return result
}

Expand All @@ -164,4 +142,12 @@ class AwsOptions implements CloudTransferOptions {
return awsConfig.batchConfig.terminateUnschedulableJobs
}

String generateUploadCliArgs(String bucketName){
return awsConfig.generateUploadCliArgs(bucketName)
}

String generateDownloadCliArgs(String bucketName){
return awsConfig.generateDownloadCliArgs(bucketName)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020-2025, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cloud.aws.config

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.config.spec.ConfigOption
import nextflow.script.dsl.Description

/**
* Model AWS S3 bucket config settings
*
* @author Jorge Ejarque <[email protected]>
*/
@Slf4j
@CompileStatic
class AwsBucketConfig extends AwsS3CommonConfig {

@ConfigOption
@Description("""
S3 Bucket specific AWS region (e.g. `us-east-1`).
""")
final String region

AwsBucketConfig(Map opts) {
super(opts)
this.region = opts.region as String
}

Map<String, Object> toBucketConfigMap(){
final map = super.toBucketConfigMap()
if( region ) map.put('region', region)
return map
}
}
Loading
Loading