Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ package io.seqera.wave.service.blob.impl

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Nullable
import io.seqera.wave.configuration.BlobCacheConfig
import io.seqera.wave.core.ContainerPlatform
import io.seqera.wave.service.blob.TransferStrategy
import io.seqera.wave.service.k8s.K8sService
import jakarta.inject.Inject
import static io.seqera.wave.util.K8sHelper.getSelectorLabel

/**
* Implements {@link TransferStrategy} that runs s5cmd using a
* Kubernetes job
Expand All @@ -44,10 +49,17 @@ class KubeTransferStrategy implements TransferStrategy {
@Inject
private K8sService k8sService

@Property(name='wave.build.k8s.node-selector')
@Nullable
private Map<String, String> nodeSelectorMap

@Override
void launchJob(String jobName, List<String> command) {

final selector = getSelectorLabel(ContainerPlatform.DEFAULT, nodeSelectorMap)

// run the transfer job
k8sService.launchTransferJob(jobName, blobConfig.s5Image, command, blobConfig)
k8sService.launchTransferJob(jobName, blobConfig.s5Image, command, blobConfig, selector)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class KubeBuildStrategy extends BuildStrategy {
final buildImage = getBuildImage(req)
final buildCmd = launchCmd(req)
final timeout = req.maxDuration ?: buildConfig.defaultTimeout
final selector= getSelectorLabel(req.platform, nodeSelectorMap)
final selector = getSelectorLabel(req.platform, nodeSelectorMap)
k8sService.launchBuildJob(jobName, buildImage, buildCmd, req.workDir, configFile, timeout, selector)
}
catch (ApiException e) {
Expand Down
6 changes: 3 additions & 3 deletions src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ interface K8sService {

void deleteJob(String name)

V1Job launchTransferJob(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig)
V1Job launchTransferJob(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig, Map<String,String> nodeSelector)

V1Job launchBuildJob(String name, String containerImage, List<String> args, Path workDir, Path creds, Duration timeout, Map<String,String> nodeSelector)

V1Job launchScanJob(String name, String containerImage, List<String> args, Path workDir, Path creds, ScanConfig scanConfig)
V1Job launchScanJob(String name, String containerImage, List<String> args, Path workDir, Path creds, ScanConfig scanConfig, Map<String,String> nodeSelector)

V1Job launchMirrorJob(String name, String containerImage, List<String> args, Path workDir, Path creds, MirrorConfig config)
V1Job launchMirrorJob(String name, String containerImage, List<String> args, Path workDir, Path creds, MirrorConfig config, Map<String,String> nodeSelector)

V1Pod getLatestPodForJob(String jobName)

Expand Down
21 changes: 12 additions & 9 deletions src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ class K8sServiceImpl implements K8sService {
* The {@link V1Job} description the submitted job
*/
@Override
V1Job launchTransferJob(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig) {
final spec = createTransferJobSpec(name, containerImage, args, blobConfig)
V1Job launchTransferJob(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig, Map<String,String> nodeSelector) {
final spec = createTransferJobSpec(name, containerImage, args, blobConfig, nodeSelector)

return k8sClient
.batchV1Api()
Expand All @@ -436,7 +436,7 @@ class K8sServiceImpl implements K8sService {
: null
}

V1Job createTransferJobSpec(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig) {
V1Job createTransferJobSpec(String name, String containerImage, List<String> args, BlobCacheConfig blobConfig, Map<String,String> nodeSelector) {

V1JobBuilder builder = new V1JobBuilder()

Expand Down Expand Up @@ -466,6 +466,7 @@ class K8sServiceImpl implements K8sService {
.withRestartPolicy("Never")
.withDnsConfig(dnsConfig())
.withDnsPolicy(dnsPolicy)
.withNodeSelector(nodeSelector)
//container section
.addNewContainer()
.withName(name)
Expand Down Expand Up @@ -627,15 +628,15 @@ class K8sServiceImpl implements K8sService {
}

@Override
V1Job launchScanJob(String name, String containerImage, List<String> args, Path workDir, Path creds, ScanConfig scanConfig) {
final spec = scanJobSpec(name, containerImage, args, workDir, creds, scanConfig)
V1Job launchScanJob(String name, String containerImage, List<String> args, Path workDir, Path creds, ScanConfig scanConfig, Map<String,String> nodeSelector) {
final spec = scanJobSpec(name, containerImage, args, workDir, creds, scanConfig, nodeSelector)
return k8sClient
.batchV1Api()
.createNamespacedJob(namespace, spec)
.execute()
}

V1Job scanJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, ScanConfig scanConfig) {
V1Job scanJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, ScanConfig scanConfig, Map<String,String> nodeSelector) {

final mounts = new ArrayList<V1VolumeMount>(5)
mounts.add(mountBuildStorage(workDir, storageMountPath, false))
Expand Down Expand Up @@ -669,6 +670,7 @@ class K8sServiceImpl implements K8sService {
.addAllToVolumes(volumes)
.withDnsConfig(dnsConfig())
.withDnsPolicy(dnsPolicy)
.withNodeSelector(nodeSelector)

final requests = new V1ResourceRequirements()
if( scanConfig.requestsCpu )
Expand Down Expand Up @@ -702,15 +704,15 @@ class K8sServiceImpl implements K8sService {
}

@Override
V1Job launchMirrorJob(String name, String containerImage, List<String> args, Path workDir, Path creds, MirrorConfig config) {
final spec = mirrorJobSpec(name, containerImage, args, workDir, creds, config)
V1Job launchMirrorJob(String name, String containerImage, List<String> args, Path workDir, Path creds, MirrorConfig config, Map<String,String> nodeSelector) {
final spec = mirrorJobSpec(name, containerImage, args, workDir, creds, config, nodeSelector)
return k8sClient
.batchV1Api()
.createNamespacedJob(namespace, spec)
.execute()
}

V1Job mirrorJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, MirrorConfig config) {
V1Job mirrorJobSpec(String name, String containerImage, List<String> args, Path workDir, Path credsFile, MirrorConfig config, Map<String,String> nodeSelector) {

// required volumes
final mounts = new ArrayList<V1VolumeMount>(5)
Expand Down Expand Up @@ -744,6 +746,7 @@ class K8sServiceImpl implements K8sService {
.addAllToVolumes(volumes)
.withDnsConfig(dnsConfig())
.withDnsPolicy(dnsPolicy)
.withNodeSelector(nodeSelector)

final requests = new V1ResourceRequirements()
if( config.requestsCpu )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.kubernetes.client.openapi.ApiException
import io.micronaut.context.annotation.Primary
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.core.annotation.Nullable
import io.seqera.wave.configuration.MirrorConfig
import io.seqera.wave.configuration.MirrorEnabled
import io.seqera.wave.exception.BadRequestException
import io.seqera.wave.service.k8s.K8sService
import io.seqera.wave.service.mirror.MirrorRequest
import jakarta.inject.Inject
import jakarta.inject.Singleton
import static io.seqera.wave.util.K8sHelper.getSelectorLabel

/**
* Implements a container mirror runner based on Kubernetes
*
Expand All @@ -51,10 +55,15 @@ class KubeMirrorStrategy extends MirrorStrategy {
@Inject
private K8sService k8sService

@Property(name='wave.build.k8s.node-selector')
@Nullable
private Map<String, String> nodeSelectorMap

@Override
void mirrorJob(String jobName, MirrorRequest request) {
// docker auth json file
final Path configFile = request.authJson ? request.workDir.resolve('config.json') : null
final selector = getSelectorLabel(request.platform, nodeSelectorMap)

try {
k8sService.launchMirrorJob(
Expand All @@ -63,7 +72,8 @@ class KubeMirrorStrategy extends MirrorStrategy {
copyCommand(request),
request.workDir,
configFile,
config)
config,
selector)
}
catch (ApiException e) {
throw new BadRequestException("Unexpected build failure - ${e.responseBody}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import io.seqera.wave.configuration.ScanEnabled
import io.seqera.wave.exception.BadRequestException
import io.seqera.wave.service.k8s.K8sService
import jakarta.inject.Singleton
import static io.seqera.wave.util.K8sHelper.getSelectorLabel

/**
* Implements ScanStrategy for Kubernetes
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ class KubeTransferStrategyTest extends Specification {
def podName = "$jobName-abc".toString()
def pod = new V1Pod(metadata: [name: podName, creationTimestamp: OffsetDateTime.now()])
pod.status = new V1PodStatus(phase: "Succeeded")
k8sService.launchTransferJob(_, _, _, _) >> new V1Job(metadata: [name: jobName])
k8sService.launchTransferJob(_, _, _, _, _) >> new V1Job(metadata: [name: jobName])
k8sService.getPod(_) >> pod
k8sService.logsPod(_) >> "transfer successful"

when:
strategy.launchJob(podName, command)

then:
1 * k8sService.launchTransferJob(podName, blobConfig.s5Image, command, blobConfig)
1 * k8sService.launchTransferJob(podName, blobConfig.s5Image, command, blobConfig, _)
}

}
Loading
Loading